/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.flink.connector.file.src;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.connector.file.src.util.CheckpointedPosition;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.Path;
import org.apache.flink.util.StringUtils;

import javax.annotation.Nullable;

import java.io.Serializable;
import java.util.Arrays;
import java.util.Optional;

import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;

/**
 * A {@link SourceSplit} that represents a file, or a region of a file.
 *
 * <p>The split has an offset and an end, which defines the region of the file represented by the
 * split. For splits representing the while file, the offset is zero and the length is the file
 * size.
 *
 * <p>The split may furthermore have a "reader position", which is the checkpointed position from a
 * reader previously reading this split. This position is typically null when the split is assigned
 * from the enumerator to the readers, and is non-null when the readers checkpoint their state in a
 * file source split.
 *
 * <p>This class is {@link Serializable} for convenience. For Flink's internal serialization (both
 * for RPC and for checkpoints), the {@link FileSourceSplitSerializer} is used.
 */
@PublicEvolving
public class FileSourceSplit implements SourceSplit, Serializable {

    private static final long serialVersionUID = 1L;

    private static final String[] NO_HOSTS = StringUtils.EMPTY_STRING_ARRAY;

    /** The unique ID of the split. Unique within the scope of this source. */
    private final String id;

    /** The path of the file referenced by this split. */
    private final Path filePath;

    /** The position of the first byte in the file to process. */
    private final long offset;

    /** The number of bytes in the file to process. */
    private final long length;

    /** The modification time of the file, from {@link FileStatus#getModificationTime()}. */
    private final long fileModificationTime;

    /** The file size in bytes, from {@link FileStatus#getLen()}. */
    private final long fileSize;

    /**
     * The names of the hosts storing this range of the file. Empty, if no host information is
     * available.
     */
    private final String[] hostnames;

    /** The precise reader position in the split, to resume from. */
    @Nullable private final CheckpointedPosition readerPosition;

    /**
     * The splits are frequently serialized into checkpoints. Caching the byte representation makes
     * repeated serialization cheap. This field is used by {@link FileSourceSplitSerializer}.
     */
    @Nullable transient byte[] serializedFormCache;

    // --------------------------------------------------------------------------------------------

    /**
     * Constructs a split with host information.
     *
     * @param id The unique ID of this source split.
     * @param filePath The path to the file.
     * @param offset The start (inclusive) of the split's rage in the file.
     * @param length The number of bytes in the split (starting from the offset)
     * @param fileModificationTime The modification time of the file
     * @param fileSize The size of the full file
     */
    public FileSourceSplit(
            String id,
            Path filePath,
            long offset,
            long length,
            long fileModificationTime,
            long fileSize) {
        this(id, filePath, offset, length, fileModificationTime, fileSize, NO_HOSTS);
    }

    /**
     * Constructs a split with host information.
     *
     * @param filePath The path to the file.
     * @param offset The start (inclusive) of the split's rage in the file.
     * @param length The number of bytes in the split (starting from the offset)
     * @param fileModificationTime The modification time of the file
     * @param fileSize The size of the full file
     * @param hostnames The hostnames of the nodes storing the split's file range.
     */
    public FileSourceSplit(
            String id,
            Path filePath,
            long offset,
            long length,
            long fileModificationTime,
            long fileSize,
            String... hostnames) {
        this(id, filePath, offset, length, fileModificationTime, fileSize, hostnames, null, null);
    }

    /**
     * Constructs a split with host information.
     *
     * @param filePath The path to the file.
     * @param offset The start (inclusive) of the split's rage in the file.
     * @param length The number of bytes in the split (starting from the offset)
     * @param fileModificationTime The modification time of the file
     * @param fileSize The size of the full file
     * @param hostnames The hostnames of the nodes storing the split's file range.
     */
    public FileSourceSplit(
            String id,
            Path filePath,
            long offset,
            long length,
            long fileModificationTime,
            long fileSize,
            String[] hostnames,
            @Nullable CheckpointedPosition readerPosition) {
        this(
                id,
                filePath,
                offset,
                length,
                fileModificationTime,
                fileSize,
                hostnames,
                readerPosition,
                null);
    }

    /** @deprecated You should use {@link #FileSourceSplit(String, Path, long, long, long, long)} */
    @Deprecated
    public FileSourceSplit(String id, Path filePath, long offset, long length) {
        this(id, filePath, offset, length, 0, 0, NO_HOSTS);
    }

    /**
     * @deprecated You should use {@link #FileSourceSplit(String, Path, long, long, long, long,
     *     String...)}
     */
    @Deprecated
    public FileSourceSplit(
            String id, Path filePath, long offset, long length, String... hostnames) {
        this(id, filePath, offset, length, 0, 0, hostnames, null, null);
    }

    /**
     * @deprecated You should use {@link #FileSourceSplit(String, Path, long, long, long, long,
     *     String[], CheckpointedPosition)}
     */
    @Deprecated
    public FileSourceSplit(
            String id,
            Path filePath,
            long offset,
            long length,
            String[] hostnames,
            @Nullable CheckpointedPosition readerPosition) {
        this(id, filePath, offset, length, 0, 0, hostnames, readerPosition, null);
    }

    /**
     * Package private constructor, used by the serializers to directly cache the serialized form.
     */
    FileSourceSplit(
            String id,
            Path filePath,
            long offset,
            long length,
            long fileModificationTime,
            long fileSize,
            String[] hostnames,
            @Nullable CheckpointedPosition readerPosition,
            @Nullable byte[] serializedForm) {
        this.fileModificationTime = fileModificationTime;
        this.fileSize = fileSize;

        checkArgument(offset >= 0, "offset must be >= 0");
        checkArgument(length >= 0, "length must be >= 0");
        checkNoNullHosts(hostnames);

        this.id = checkNotNull(id);
        this.filePath = checkNotNull(filePath);
        this.offset = offset;
        this.length = length;
        this.hostnames = hostnames;
        this.readerPosition = readerPosition;
        this.serializedFormCache = serializedForm;
    }

    // ------------------------------------------------------------------------
    //  split properties
    // ------------------------------------------------------------------------

    @Override
    public String splitId() {
        return id;
    }

    /** Gets the file's path. */
    public Path path() {
        return filePath;
    }

    /**
     * Returns the start of the file region referenced by this source split. The position is
     * inclusive, the value indicates the first byte that is part of the split.
     */
    public long offset() {
        return offset;
    }

    /** Returns the number of bytes in the file region described by this source split. */
    public long length() {
        return length;
    }

    /** Returns the modification time of the file, from {@link FileStatus#getModificationTime()}. */
    public long fileModificationTime() {
        return fileModificationTime;
    }

    /** Returns the full file size in bytes, from {@link FileStatus#getLen()}. */
    public long fileSize() {
        return fileSize;
    }

    /**
     * Gets the hostnames of the nodes storing the file range described by this split. The returned
     * array is empty, if no host information is available.
     *
     * <p>Host information is typically only available on specific file systems, like HDFS.
     */
    public String[] hostnames() {
        return hostnames;
    }

    /**
     * Gets the (checkpointed) position of the reader, if set. This value is typically absent for
     * splits when assigned from the enumerator to the readers, and present when the splits are
     * recovered from a checkpoint.
     */
    public Optional<CheckpointedPosition> getReaderPosition() {
        return Optional.ofNullable(readerPosition);
    }

    /**
     * Creates a copy of this split where the checkpointed position is replaced by the given new
     * position.
     *
     * <p><b>IMPORTANT:</b> Subclasses that add additional information to the split must override
     * this method to return that subclass type. This contract is enforced by checks in the file
     * source implementation. We did not try to enforce this contract via generics in this split
     * class, because it leads to very ugly and verbose use of generics.
     */
    public FileSourceSplit updateWithCheckpointedPosition(@Nullable CheckpointedPosition position) {
        return new FileSourceSplit(
                id, filePath, offset, length, fileModificationTime, fileSize, hostnames, position);
    }

    // ------------------------------------------------------------------------
    //  utils
    // ------------------------------------------------------------------------

    @Override
    public String toString() {
        final String hosts =
                hostnames.length == 0 ? "(no host info)" : " hosts=" + Arrays.toString(hostnames);
        return String.format(
                "FileSourceSplit: %s [%d, %d) %s ID=%s position=%s",
                filePath, offset, offset + length, hosts, id, readerPosition);
    }

    private static void checkNoNullHosts(String[] hosts) {
        checkNotNull(hosts, "hostnames array must not be null");
        for (String host : hosts) {
            checkArgument(host != null, "the hostnames must not contain null entries");
        }
    }
}
